Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(shuffles): Incrementally retrieve metadata in reduce #3545

Merged
merged 7 commits into from
Jan 20, 2025

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Dec 11, 2024

Incrementally retrieve partition and metadata as fanouts are completed, instead of retrieving only after all are completed. This drastically speeds up large partition shuffles.

Before, we would see pauses between the map and reduce phase. (example below is a 1000x 1000x 100mb shuffle)
Screenshot 2024-12-10 at 5 14 09 PM

Now:
Screenshot 2024-12-10 at 5 14 42 PM

@github-actions github-actions bot added the perf label Dec 11, 2024
Copy link

codspeed-hq bot commented Dec 11, 2024

CodSpeed Performance Report

Merging #3545 will degrade performances by 13.84%

Comparing colin/incremental-metadata-retrieval (f15c7e5) with main (6157ad8)

Summary

❌ 1 regressions
✅ 26 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/incremental-metadata-retrieval Change
test_iter_rows_first_row[100 Small Files] 192.6 ms 223.5 ms -13.84%

Copy link

codecov bot commented Dec 11, 2024

Codecov Report

Attention: Patch coverage is 96.42857% with 1 line in your changes missing coverage. Please review.

Project coverage is 77.17%. Comparing base (c932ec9) to head (f15c7e5).
Report is 23 commits behind head on main.

Files with missing lines Patch % Lines
daft/execution/execution_step.py 96.15% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3545      +/-   ##
==========================================
- Coverage   78.06%   77.17%   -0.90%     
==========================================
  Files         728      728              
  Lines       89967    91917    +1950     
==========================================
+ Hits        70236    70936     +700     
- Misses      19731    20981    +1250     
Files with missing lines Coverage Δ
daft/execution/physical_plan.py 94.35% <100.00%> (ø)
daft/execution/execution_step.py 89.63% <96.15%> (+0.19%) ⬆️

... and 43 files with indirect coverage changes

@colin-ho colin-ho requested a review from jaychia December 12, 2024 08:15
@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Interesting. One suggestion I have is to maybe see if we can use ray.wait([...metadatas], fetch_local=True) (see: https://docs.ray.io/en/latest/ray-core/api/doc/ray.wait.html)

I wonder if this might simplify the logic here, and it might also fix some interesting OOM issues that I was observing wrt the workers that are holding the metadata objectrefs dying due to OOM.

Could we try that and see if it gives us the same effect? We have to figure out where is appropriate to call this on the metadata objectrefs though. I would look into _await_tasks on our RayRunner.

@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Another idea... Looks like these metadatas are being retrieved using a Ray remote function:

@ray.remote
def get_metas(*partitions: MicroPartition) -> list[PartitionMetadata]:
    return [PartitionMetadata.from_table(partition) for partition in partitions]

My guess is that if the MicroPartition lives in the object store, we're having to page this data back in just to grab metadata. This is even worse if spilling occurred and we maybe have to page it back in from disk?

I wonder if there's a better way of dealing with this.

Edit: i think this might only be triggered from certain codepaths, and in most cases the metadata is returned as an objectref after execution. This code is so messy...

@jaychia
Copy link
Contributor

jaychia commented Dec 12, 2024

Doing some experiments in #3557 but it seems like ray.wait(fetch_local=True) doesn't seem to do anything for the metadata in my case (I still hit OOMs). I'm guessing maybe for small objects like the metadata this doesn't actually work, and it only works for things in the object store?

Doing another round of tests now using an explicit ray.get and trying to explicitly set the cached metadata object in the result. If that works, then it could work well for your use-case also.

Copy link
Contributor

@jaychia jaychia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR looks good to me, but I wonder if we should take a more holistic approach here and maybe just fix how metadata is being passed around the system rather than patch it here specifically for the map/reduce shuffle.

Maybe something really dumb like hooking into a stats/metadata actor? Would love to get thoughts from @samster25

@colin-ho
Copy link
Contributor Author

Doing another round of tests now using an explicit ray.get and trying to explicitly set the cached metadata object in the result. If that works, then it could work well for your use-case also.

Tried this out, essentially always fetch metadata and cache it upon partition task completion, unless it is the final partition task.

Tried this on a few shuffle configurations (1000 x 1000, 2000 x 2000), and it works pretty well, roughly 20s faster.

This could be viable? I believe as long as the task is not final, the partition metadata will need to be retrieved anyway for subsequent tasks, so might as well retrieve it once it is complete. Since it is a blocking call we want to pipeline it with running tasks, which should work if we retrieve it during the task awaiting state.

@colin-ho
Copy link
Contributor Author

Made a refactor to have the metadata caching logic on the scheduler instead of on reduce op, ptal!

@colin-ho
Copy link
Contributor Author

Made a refactor to have the metadata caching logic on the scheduler instead of on reduce op, ptal! @jaychia

@colin-ho colin-ho requested a review from jaychia January 15, 2025 18:23
@colin-ho
Copy link
Contributor Author

Tested on TPCH SF 1000 on a 8 x i8.4xlarge node cluster.

Query This PR Main Difference % Change
Q1 20.48 20.55 -0.07 -0.34%
Q2 37.70 39.41 -1.71 -4.34%
Q3 27.53 27.53 0.00 0.00%
Q4 17.22 17.34 -0.12 -0.69%
Q5 115.50 120.19 -4.69 -3.90%
Q6 8.05 6.75 1.30 19.26%
Q7 70.65 68.93 1.72 2.50%
Q8 88.19 89.56 -1.37 -1.53%
Q9 197.09 206.90 -9.81 -4.74%
Q10 45.81 44.90 0.91 2.03%

Not much difference

Copy link
Contributor

@jaychia jaychia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea sounds good to me, but wondering if we should just make it always retrieve metadata -- does that affect performance?

@@ -1771,7 +1771,7 @@ def __iter__(self) -> MaterializedPhysicalPlan:
try:
step = next(self.child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output(stage_id=stage_id)
step = step.finalize_partition_task_single_output(stage_id=stage_id, cache_metadata_on_done=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How cheap is this, and should we just always do this perhaps?

I believe a fetch is also going to be fetched if a user ever tries to display/show a dataframe, since it needs to figure out how large the entire dataframe is using the total row count. If that's the case, maybe we just simplify the logic here and always force a fetch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doing this on the final step may introduce some regressions if the result set of a .collect is very large and metadata is not required, i.e. if the dataframe is not displayed, or perhaps it is an intermediate collect step. If it is a show then it should be quite cheap and this eager fetching probably won't help very much.

@colin-ho colin-ho merged commit 4b8397b into main Jan 20, 2025
42 of 43 checks passed
@colin-ho colin-ho deleted the colin/incremental-metadata-retrieval branch January 20, 2025 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants